Amazon MSK 基于 S3 的数据导出、导入、备份、还原、迁移方案

您所在的位置:网站首页 Amazon s3迁移 Amazon MSK 基于 S3 的数据导出、导入、备份、还原、迁移方案

Amazon MSK 基于 S3 的数据导出、导入、备份、还原、迁移方案

2024-05-29 17:17| 来源: 网络整理| 查看: 265

Amazon MSK(Amazon Managed Streaming for Apache Kafka)是 Amazon 云平台提供的托管 Kafka 服务。在系统升级或迁移时,用户常常需要将一个 Amazon MSK 集群中的数据导出(备份),然后在新集群或另一个集群中再将数据导入(还原)。通常,Kafka 集群间的数据复制和同步多采用 Kafka MirrorMaker,但是,在某些场景中,受环境限制,两个于 Kafka 集群之间的网络可能无法连通,或者两个 AWS 账号相互隔离,亦或是需要将 Kafka 的数据沉淀为文件存储以备他用。此时,基于 Kafka Connect S3 Source / Sink Connector 的方案会是一种较为合适的选择,本文就将介绍一下这一方案的具体实现。

数据的导出、导入、备份、还原通常都是一次性操作,为此搭建完备持久的基础设施并无太大必要,省时省力,简单便捷才是优先的考量因素。为此,本文将提供一套开箱即用的解决方案,方案使用 Docker 搭建 Kafka Connect,所有操作均配备自动化 Shell 脚本,用户只需设置一些环境变量并执行相应脚本即可完成全部工作。这种基于 Docker 的单体模式可以应对中小型规模的数据同步和迁移,如果要寻求稳定、健壮的解决方案,可以考虑将 Docker 版本的 Kafka Connect 迁移到 Kubernetes 或 Amazon MSK Connect,实现集群化部署。

1. 整体架构

首先介绍一下方案的整体架构。导出/导入和备份/还原其实是两种高度类似的场景,但为了描述清晰,我们还是分开讨论。先看一下导出/导入的架构示意图:

图 1 MSK 集群间的数据导出/导入

在这个架构中,Source 端的 MSK 是数据流的起点,安装了 S3 Sink Connector 的 Kafka Connect 会从 Source 端的 MSK 中提取指定 Topic 的数据,然后以 Json 或 Avro 文件的形式存储到 S3 上;同时,另一个安装了 S3 Source Connector 的 Kafka Connect 会从 S3 上读取这些 Json 或 Avro 文件,然后写入到 Sink 端 MSK 的对应 Topic 中。如果 Source 端和 Sink 端的 MSK 集群不在同一个 Region,可以在各自的 Region 分别完成导入和导出,然后在两个 Region 之间使用 S3 的 Cross-Rejion Replication 进行数据同步。

该架构只需进行简单的调整,即可用于 MSK 集群的备份/还原,如下图所示:先将 MSK 集群的数据备份到 S3 上,待完成集群的升级、迁移或重建工作后,再从 S3 上将数据恢复到新建集群即可。

图 2 MSK 集群的数据备份/还原

本文将以图 1 所示的导出/导入架构为准给出完整的环境搭建说明和实操脚本,图 2 所示的备份/还原架构同样可以基于本文提供的指导和脚本实现。

2. 预设条件

本文聚焦于 Kafka Connect 的数据导出/导入和备份/还原操作,限于篇幅,无法详细介绍架构中每个组件的搭建和配置方法,因此有如下预设条件需读者在个人环境中提前准备:

① 一台基于 Amazon Linux2 的 EC2 实例(建议新建纯净实例),本文所有的实操脚本都将在该实例上执行,该实例也是运行 Kafka Connect Docker Container 的宿主机。

② 两个 MSK 集群,一个作为 Source,一个作为 Sink;如果只有一个 MSK 集群也可完成验证,该集群将既作 Source 又作 Sink。

③ 为聚焦 Kafka Connect S3 Source / Sink Connector 的核心配置,我们预设 MSK 集群没有开启身份认证(即认证类型为 Unauthenticated),数据传输方式为 PLAINTEXT,以便简化 Kafka Connect 的连接配置。

④ 网络连通性上要求 EC2 实例能访问 S3、Source 端 MSK 集群、Sink 端 MSK 集群 。如果在实际环境中无法同时连通 Source 端和 Sink 端,则可以在两台分属于不同网络的 EC2 上进行操作,但它们必须都能访问 S3。如果是跨 Region 或账号隔离,则另需配置 S3 Cross-Region Replication 或手动拷贝数据文件。 # 3. 全局配置

由于实际操作将不可避免地依赖到具体的 AWS 账号以及本地环境里的各项信息(如 AKSK,服务地址,各类路径,Topic 名称等),为了保证本文给出的操作脚本具有良好的可移植性,我们将所有与环境相关的信息抽离出来,以全局变量的形式在实操前集中配置。以下就是全局变量的配置脚本,读者需要根据个人环境设定这些变量的取值:

# account-specific configs export REGION="" export S3_BUCKET="" export AWS_ACCESS_KEY_ID="" export AWS_SECRET_ACCESS_KEY="" export SOURCE_KAFKA_BOOTSTRAP_SEVERS="" export SINK_KAFKA_BOOTSTRAP_SEVERS="" # kafka topics import and export configs export SOURCE_TOPICS_LIST="" export SINK_TOPICS_LIST="" export TOPIC_REGEX_LIST="" export SOURCE_TOPICS_REGEX="" export SINK_TOPICS_REPLACEMENT=""

为了便于演示和解读,本文将使用下面的全局配置,其中前 6 项配置与账号和环境强相关,仍需用户自行修改,脚本中给出的仅为示意值,而后 5 项配置与 MSK 数据的导入导出息息相关,不建议修改,因为后续的解读将基于这里设定的值展开,待完成验证后,您可再根据需要灵活修改后 5 项配置以完成实际的导入导出工作。

回到操作流程,登录准备好的 EC2 实例,修改下面脚本中与账号和环境相关的前 6 项配置,然后执行修改后的脚本。此外,需要提醒注意的是:在后续操作中,部分脚本执行后将不再返回,而是持续占用当前窗口输出日志或 Kafka 消息,因此需要新开命令行窗口,每次新开窗口都需要执行一次这里的全局配置脚本。

# 实操步骤(1): 全局配置 # account and environment configs export REGION="us-east-1" export S3_BUCKET="source-topics-data" export AWS_ACCESS_KEY_ID="ABCDEFGHIGKLMNOPQRST" export AWS_SECRET_ACCESS_KEY="abcdefghigklmnopqrstuvwxyz0123456789" export SOURCE_KAFKA_BOOTSTRAP_SEVERS="b-1.cluster1.6ww5j7.c1.kafka.us-east-1.amazonaws.com:9092" export SINK_KAFKA_BOOTSTRAP_SEVERS="b-1.cluster2.2au4b8.c2.kafka.us-east-1.amazonaws.com:9092" # kafka topics import and export configs export SOURCE_TOPICS_LIST="source-topic-1,source-topic-2" export SINK_TOPICS_LIST="sink-topic-1,sink-topic-2" export TOPIC_REGEX_LIST="source-topic-1:.*,source-topic-2:.*" export SOURCE_TOPICS_REGEX="source-topic-(\\\d)" # to be resolved to "source-topic-(\\d)" in json configs export SINK_TOPICS_REPLACEMENT="sink-topic-\$1" # to be resolved to "sink-topic-$1" in json configs

关于上述脚本中的后 5 项配置,有如下详细说明:

配置项 样值 说明 SOURCE_TOPICS_LIST source-topic-1,source-topic-2 该值将赋给 S3 Sink Connector 的 topics 配置项,该配置用于指明要被导出的 Topic 列表(使用逗号分隔) SINK_TOPICS_LIST sink-topic-1,sink-topic-2 该值是 Sink 端与 Source Topics 一一对应的 Sink Topics 列表(使用逗号分隔),但它并不会出现在 S3 Sink Connector 的配置中,因为 S3 Sink Connector 可从 S3 的目录结构中获知存在哪些 Source 端的 Topic,而 Sink 端的 Topic 名称是在 Source 端 Topic 名称基础上使用正则表达式映射出来的,该值仅应用在创建 Sink 端的 Topic 的脚本中(备注:技术上是可以不设置该变量的,它的值可从SOURCE_TOPICS_LIST、TOPIC_REGEX_LIST、SINK_TOPICS_REPLACEMENT解析出来,但是这样会增加脚本的复杂度,给读者阅读和理解脚本造成不便) TOPIC_REGEX_LIST source-topic-1:.*,source-topic-2:.* 该值将赋给 S3 Source Connector 的 topic.regex.list 配置项,它的格式是:,:,…,该配置的作用是告诉 S3 Source Connector 每一个 Topic 对应的哪些文件是数据文件,正则表达式用于匹配文件名(需要注意的是:正则表达式并不会用于匹配文件的中间路径,中间路径(例如partition=0) 是由配置项 partitioner.class 控制的, S3 Source Connector 必须使用和 S3 Sink Connector 一致的 Patitioner 才能正确匹配文件路径 SOURCE_TOPICS_REGEX source-topic-(\\) 该值将赋给 S3 Source Connector 的 transforms.xxx.regex 配置项,它是 Source 端 MSK 集群上所有 Topic 的正则表达式,该项值通常都会出现正则分组(group),与之关联的SINK_TOPICS_REPLACEMENT表达式将会引用这些分组映射成 Sink 端的目标Topic SINK_TOPICS_REPLACEMENT sink-topic-\$1 该值将赋给 S3 Source Connector 的 transforms.xxx.replacement 配置项,它是 Sink 端 MSK 集群上所有 Topic 的正则表达式,它通常会引用SOURCE_TOPICS_REGEX中的正则分组以便映射到 Sink 端的目标 Topic 上

我们就以脚本中设定的值为例,解读一下这 5 项配置联合起来将要实现的功能,同时也是本文将演示的主要内容:

在 Source 端的 MSK 集群上存在两个名为 source-topic-1 和 source-topic-2 的Topic,通过安装有 S3 Sink Connector 的 Kafka Connect (Docker 容器)将两个 Topic 的数据导出到 S3 的指定存储桶中,然后再通过安装有 S3 Source Connector 的 Kafka Connect (Docker 容器,可以和 S3 Source Connector 共存为一个Docker 容器)将 S3 存储桶中的数据写入到 Sink 端的 MSK 集群上,其中原source-topic-1 的数据将被写入 sink-topic-1,原 source-topic-2 的数据将被写入 sink-topic-2。

特别地,如果是备份/还原场景,需要保持导出/导入的 Topic 名称一致,此时,可直接删除 S3 Source Connector 中以 transforms 开头的 4 项配置(将在下文中出现),或者将下面两项改为:

export SOURCE_TOPICS_REGEX=".*" export SINK_TOPICS_REPLACEMENT="\$0"

如果您只有一个 MSK 集群,同样可以完成本文的验证工作,只需将 SOURCE_KAFKA_BOOTSTRAP_SEVERS 和 SINK_KAFKA_BOOTSTRAP_SEVERS 同时设置为该集群即可,这样,该集群既是 Source 端又是 Sink 端,由于配置中的 Source Topics 和 Sink Topics 并不同名,所以不会产生冲突。

4. 环境准备 4.1 安装工具包

在 EC2 上执行以下脚本,安装并配置 jq,yq,docker,jdk,kafka-console-client 五个必须的软件包,您可以根据自身 EC2 的情况酌情选择安装全部或部分软件。建议使用纯净的 EC2 实例,完成全部的软件安装:

# 实操步骤(2): 安装工具包 # install jq sudo yum -y install jq jq --version # install yq sudo wget https://github.com/mikefarah/yq/releases/download/v4.35.1/yq_linux_amd64 -O /usr/bin/yq sudo chmod a+x /usr/bin/yq yq --version # install docker sudo yum -y install docker # enable & start docker sudo systemctl enable docker sudo systemctl start docker sudo systemctl status docker # configure docker, add current user to docker user group # and refresh docker group to take effect immediately sudo usermod -aG docker $USER newgrp docker docker --version # install docker compose dockerConfigDir=${dockerConfigDir:-$HOME/.docker} mkdir -p $dockerConfigDir/cli-plugins wget "https://github.com/docker/compose/releases/download/v2.20.3/docker-compose-$(uname -s)-$(uname -m)" -O $dockerConfigDir/cli-plugins/docker-compose chmod a+x $dockerConfigDir/cli-plugins/docker-compose docker compose version # install jdk sudo yum -y install java-1.8.0-openjdk-devel # configure jdk sudo tee /etc/profile.d/java.sh


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3